package cn.itljh.process

import cn.itljh.util.OffsetUtils
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{Connection, DriverManager, PreparedStatement}
import scala.collection.mutable
/**
 * 疫情物资数据实时处理分析
 */
object Covid19_WZData_Process {
  def main(args: Array[String]): Unit = {
    //1.准备SparkStreaming的开发环境
    val conf = new SparkConf().setAppName("Covid19_WZData_Process").setMaster("local[*]")
    val context = new SparkContext(conf)
    context.setLogLevel("WARN")
    val ssc = new StreamingContext(context, Seconds(5)) //连续流批次处理的大小
    ssc.checkpoint("./ssckp")

    //2.准备kafka的连接参数
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadopp104:9092",
      "group.id" -> "SparkKafka",
      //latest表示如果记录了偏移量的位置，就从记录的位置开始消费，如果没有记录，就从最新/或最后的位置开始消费
      //earliest表示如果记录了偏移量的位置，就从记录的位置开始消费，如果没有记录，就从最开始/最早的位置开始消费
      //none示如果记录了偏移量的位置，就从记录的位置开始消费，如果没有记录，则报错
      "auto.offset.reset" -> "latest", //偏移量的重置位置
      "enable.auto.commit" -> (false: java.lang.Boolean), //是否自动提交偏移量
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )
    val topics: Array[String] = Array("covid19_wz")

    //从mysql中查询出offsets:Map[TopicPartition, Long]
    val offsetsMap: mutable.Map[TopicPartition, Long] = OffsetUtils.getOffsetMap("SparkKafka", "covid19_wz")
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if (offsetsMap.size > 0) {
      println("MySql记录了offset信息,从offset处开始消费")
      //3.连接kafka的消息
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetsMap)
      )
    } else {
      println("MySql没有记录了offset信息,从latest处开始消费")
      //3.连接kafka的消息
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )
    }

    //4.实时处理数据并手动维护offset
    val valueDS = kafkaDS.map(_.value()) //_表示从kafka中消费出来的每一条数据
    //valueDS.print()
    //    {"count":37,"from":"下拨","name":"医用防护服/套"}
    //    {"count":432,"from":"需求","name":"医用防护服/套"}
    //    {"count":431,"from":"下拨","name":"医用外科口罩/个"}
    //    {"count":423,"from":"采购","name":"医用防护服/套"}
    //    {"count":440,"from":"需求","name":"一次性手套/套"}
    //    {"count":475,"from":"消耗","name":"一次性手套/套"}
    //    {"count":811,"from":"捐赠","name":"电子体温计/个"}
    //    {"count":754,"from":"需求","name":"N95口罩/个"}
    //    {"count":52,"from":"消耗","name":"电子体温计/个"}
    //    {"count":669,"from":"捐赠","name":"医用防护服/套"}
    /*
    需求分析：
    从kafka消费到的数据格式为如上jsonStr格式，需要将其转换为json格式（或样例类）
    对消费到的每一条数据进行处理，先进行格式转换，变成：(N95口罩/个,(采购,下拨,捐赠,消耗,需求))的格式
    然后进行数据聚合：需求=采购+下拨+捐赠-消耗
     */
    //4.1：将接收到的数据转换为元组形式：(name,(采购,下拨,捐赠,消耗,需求,库存))
    val tupleDS: DStream[(String, (Int, Int, Int, Int, Int, Int))] = kafkaDS.map(record => {
      val jsonStr: String = record.value()
      val jsonObj: JSONObject = JSON.parseObject(jsonStr)
      val name: String = jsonObj.getString("name")
      val from: String = jsonObj.getString("from")
      val count: Int = jsonObj.getInteger("count")
      //根据物资来源不同，将物资的count记录在不同的位置，最后汇总统一
      from match {
        case "采购" => (name, (count, 0, 0, 0, 0, count))
        case "下拨" => (name, (0, count, 0, 0, 0, count))
        case "捐赠" => (name, (0, 0, count, 0, 0, count))
        case "消耗" => (name, (0, 0, 0, -count, 0, -count))
        case "需求" => (name, (0, 0, 0, 0, -count, -count))
      }
    })
//    tupleDS.print()
    //    (84消毒液/瓶,0,165,0,0,0,165)
    //    (N95口罩/个,0,0,983,0,0,983)
    //    (护目镜/副,637,0,0,0,0,637)
    //    (医用防护服/套,0,794,0,0,0,794)
    //    (84消毒液/瓶,488,0,0,0,0,488)
    //    (医用防护服/套,0,614,0,0,0,614)
    //    (护目镜/副,0,0,0,-767,0,-767)
    //    (一次性手套/套,0,522,0,0,0,522)
    //    (护目镜/副,0,275,0,0,0,275)
    //    (N95口罩/个,0,0,0,0,-965,-965)
    //4.3：将上述格式的JSON数据按照key进行聚合（有状态的计算）：updateStateBykey
    //自定义函数，用来将当前批次数据和历史数据进行聚合
    val updateFunc = (currentValues: Seq[(Int, Int, Int, Int, Int, Int)], historyValue: Option[(Int, Int, Int, Int, Int, Int)]) => {
      //0.定义变量用来接收当前批次数据(采购,下拨,捐赠,消耗,需求,库存)
      var current_cg: Int = 0
      var current_xb: Int = 0
      var current_jz: Int = 0
      var current_xh: Int = 0
      var current_xq: Int = 0
      var current_kc: Int = 0
      if (currentValues.size > 0) {
        //1.取出当前批次数据
        for (currentValue <- currentValues) {
          current_cg += currentValue._1
          current_xb += currentValue._2
          current_jz += currentValue._3
          current_xh += currentValue._4
          current_xq += currentValue._5
          current_kc += currentValue._6
        }
        //2.取出历史数据
        val history_cg: Int = historyValue.getOrElse((0, 0, 0, 0, 0, 0))._1
        val history_xb: Int = historyValue.getOrElse((0, 0, 0, 0, 0, 0))._2
        val history_jz: Int = historyValue.getOrElse((0, 0, 0, 0, 0, 0))._3
        val history_xh: Int = historyValue.getOrElse((0, 0, 0, 0, 0, 0))._4
        val history_xq: Int = historyValue.getOrElse((0, 0, 0, 0, 0, 0))._5
        val history_kc: Int = historyValue.getOrElse((0, 0, 0, 0, 0, 0))._6

        //3.将当前批次数据和历史数据进行聚合
        val result_cg: Int = current_cg + history_cg
        val result_xb: Int = current_xb + history_xb
        val result_jz: Int = current_jz + history_jz
        val result_xh: Int = current_xh + history_xh
        val result_xq: Int = current_xq + history_xq
        val result_kc: Int = current_kc + history_kc

        //4.将聚合结果进行返回
        Some((
          result_cg,
          result_xb,
          result_jz,
          result_xh,
          result_xq,
          result_kc))
      } else {
        historyValue
      }
    }
    val resultDS: DStream[(String, (Int, Int, Int, Int, Int, Int))] = tupleDS.updateStateByKey(updateFunc)
    //resultDS.print()

    //5.将处理分析的结果存入mysql
    /*
 CREATE TABLE `covid19_wz` (
   `name` varchar(12) NOT NULL DEFAULT '',
   `cg` int(11) DEFAULT '0',
   `xb` int(11) DEFAULT '0',
   `jz` int(11) DEFAULT '0',
   `xh` int(11) DEFAULT '0',
   `xq` int(11) DEFAULT '0',
   `kc` int(11) DEFAULT '0',
   PRIMARY KEY (`name`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
      */
    resultDS.foreachRDD(rdd => {
      rdd.foreachPartition(lines => {
        //1.开启连接
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
        //2.编写sql并获取ps
        val sql: String = "replace into covid19_wz(name,cg,xb,jz,xh,xq,kc) values(?,?,?,?,?,?,?)"
        val ps: PreparedStatement = conn.prepareStatement(sql)
        //3.设置参数并执行
        for (line <- lines) {
          ps.setString(1,line._1)
          ps.setInt(2,line._2._1)
          ps.setInt(3,line._2._2)
          ps.setInt(4,line._2._3)
          ps.setInt(5,line._2._4)
          ps.setInt(6,line._2._5)
          ps.setInt(7,line._2._6)
          ps.executeUpdate()
        }
        //4.关闭资源
        ps.close()
        conn.close()
      })
    })

    //6.手动提交偏移量
    kafkaDS.foreachRDD(rdd => {
      if (rdd.count() > 0) {
        //rdd.foreach(record=>println("从kafka中消费到的每一条数据："+record))
        //从kafka中消费到的每一条数据：ConsumerRecord(topic = covid19_wz, partition = 0, offset = 14,
        // CreateTime = 1664438857755, checksum = 207199849, serialized key size = -1, serialized value size = 3,
        // key = null, value = aaa)

        //获取偏移量
        val offsets: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        //for(o <-offsets){
        //println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},until=${o.untilOffset}")
        //          topic=covid19_wz,partition=1,fromOffset=14,until=16
        //          topic=covid19_wz,partition=0,fromOffset=18,until=19
        //}
        //手动提交offset到kafka集群的默认主题__consumer_offsets如果开起来checkpoint，还会提交到checkpoint中
        //kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
        OffsetUtils.saveOffsets(groupId = "SparkKafka", offsets)
      }
    })


    //7.开启sparkstreaming任务并等待结束
    ssc.start()
    ssc.awaitTermination()
  }
}
